home *** CD-ROM | disk | FTP | other *** search
/ Clickx 115 / Clickx 115.iso / software / tools / windows / tails-i386-0.16.iso / live / filesystem.squashfs / usr / share / perl5 / IO / Multiplex.pm < prev    next >
Encoding:
Perl POD Document  |  2008-09-15  |  29.1 KB  |  1,093 lines

  1. package IO::Multiplex;
  2.  
  3. =head1 NAME
  4.  
  5. IO::Multiplex - Manage IO on many file handles
  6.  
  7. =head1 SYNOPSIS
  8.  
  9.   use IO::Multiplex;
  10.  
  11.   my $mux = new IO::Multiplex;
  12.   $mux->add($fh1);
  13.   $mux->add(\*FH2);
  14.   $mux->set_callback_object(...);
  15.   $mux->listen($server_socket);
  16.   $mux->loop;
  17.  
  18.   sub mux_input {
  19.     ...
  20.   }
  21.  
  22. C<IO::Multiplex> is designed to take the effort out of managing
  23. multiple file handles.  It is essentially a really fancy front end to
  24. the C<select> system call.  In addition to maintaining the C<select>
  25. loop, it buffers all input and output to/from the file handles.  It
  26. can also accept incoming connections on one or more listen sockets.
  27.  
  28. =head1 DESCRIPTION
  29.  
  30. It is object oriented in design, and will notify you of significant events
  31. by calling methods on an object that you supply.  If you are not using
  32. objects, you can simply supply C<__PACKAGE__> instead of an object reference.
  33.  
  34. You may have one callback object registered for each file handle, or
  35. one global one.  Possibly both -- the per-file handle callback object
  36. will be used instead of the global one.
  37.  
  38. Each file handle may also have a timer associated with it.  A callback
  39. function is called when the timer expires.
  40.  
  41. =head2 Handling input on descriptors
  42.  
  43. When input arrives on a file handle, the C<mux_input> method is called
  44. on the appropriate callback object.  This method is passed three
  45. arguments (in addition to the object reference itself of course):
  46.  
  47. =over 4
  48.  
  49. =item 1
  50.  
  51. a reference to the mux,
  52.  
  53. =item 2
  54.  
  55. A reference to the file handle, and
  56.  
  57. =item 3
  58.  
  59. a reference to the input buffer for the file handle.
  60.  
  61. =back
  62.  
  63. The method should remove the data that it has consumed from the
  64. reference supplied.  It may leave unconsumed data in the input buffer.
  65.  
  66. =head2 Handling output to descriptors
  67.  
  68. If C<IO::Multiplex> did not handle output to the file handles as well
  69. as input from them, then there is a chance that the program could
  70. block while attempting to write.  If you let the multiplexer buffer
  71. the output, it will write the data only when the file handle is
  72. capable of receiveing it.
  73.  
  74. The basic method for handing output to the multiplexer is the C<write>
  75. method, which simply takes a file descriptor and the data to be
  76. written, like this:
  77.  
  78.     $mux->write($fh, "Some data");
  79.  
  80. For convenience, when the file handle is C<add>ed to the multiplexer, it
  81. is tied to a special class which intercepts all attempts to write to the
  82. file handle.  Thus, you can use print and printf to send output to the
  83. handle in a normal manner:
  84.  
  85.     printf $fh "%s%d%X", $foo, $bar, $baz
  86.  
  87. Unfortunately, Perl support for tied file handles is incomplete, and
  88. functions such as C<send> cannot be supported.
  89.  
  90. Also, file handle object methods such as the C<send> method of
  91. C<IO::Socket> cannot be intercepted.
  92.  
  93. =head1 EXAMPLES
  94.  
  95. =head2 Simple Example
  96.  
  97. This is a simple telnet-like program, which demonstrates the concepts
  98. covered so far.  It does not really work too well against a telnet
  99. server, but it does OK against the sample server presented further down.
  100.  
  101.     use IO::Socket;
  102.     use IO::Multiplex;
  103.  
  104.     # Create a multiplex object
  105.     my $mux  = new IO::Multiplex;
  106.     # Connect to the host/port specified on the command line,
  107.     # or localhost:23
  108.     my $sock = new IO::Socket::INET(Proto    => 'tcp',
  109.                                     PeerAddr => shift || 'localhost',
  110.                                     PeerPort => shift || 23)
  111.         or die "socket: $@";
  112.  
  113.     # add the relevant file handles to the mux
  114.     $mux->add($sock);
  115.     $mux->add(\*STDIN);
  116.     # We want to buffer output to the terminal.  This prevents the program
  117.     # from blocking if the user hits CTRL-S for example.
  118.     $mux->add(\*STDOUT);
  119.  
  120.     # We're not object oriented, so just request callbacks to the
  121.     # current package
  122.     $mux->set_callback_object(__PACKAGE__);
  123.  
  124.     # Enter the main mux loop.
  125.     $mux->loop;
  126.  
  127.     # mux_input is called when input is available on one of
  128.     # the descriptors.
  129.     sub mux_input {
  130.         my $package = shift;
  131.         my $mux     = shift;
  132.         my $fh      = shift;
  133.         my $input   = shift;
  134.  
  135.         # Figure out whence the input came, and send it on to the
  136.         # other place.
  137.         if ($fh == $sock) {
  138.             print STDOUT $$input;
  139.         } else {
  140.             print $sock $$input;
  141.         }
  142.         # Remove the input from the input buffer.
  143.         $$input = '';
  144.     }
  145.  
  146.     # This gets called if the other end closes the connection.
  147.     sub mux_close {
  148.         print STDERR "Connection Closed\n";
  149.         exit;
  150.     }
  151.  
  152. =head2 A server example
  153.  
  154. Servers are just as simple to write.  We just register a listen socket
  155. with the multiplex object C<listen> method.  It will automatically
  156. accept connections on it and add them to its list of active file handles.
  157.  
  158. This example is a simple chat server.
  159.  
  160.     use IO::Socket;
  161.     use IO::Multiplex;
  162.  
  163.     my $mux  = new IO::Multiplex;
  164.  
  165.     # Create a listening socket
  166.     my $sock = new IO::Socket::INET(Proto     => 'tcp',
  167.                                     LocalPort => shift || 2300,
  168.                                     Listen    => 4)
  169.         or die "socket: $@";
  170.  
  171.     # We use the listen method instead of the add method.
  172.     $mux->listen($sock);
  173.  
  174.     $mux->set_callback_object(__PACKAGE__);
  175.     $mux->loop;
  176.  
  177.     sub mux_input {
  178.         my $package = shift;
  179.         my $mux     = shift;
  180.         my $fh      = shift;
  181.         my $input   = shift;
  182.  
  183.         # The handles method returns a list of references to handles which
  184.         # we have registered, except for listen sockets.
  185.         foreach $c ($mux->handles) {
  186.             print $c $$input;
  187.         }
  188.         $$input = '';
  189.     }
  190.  
  191. =head2 A more complex server example
  192.  
  193. Let us take a look at the beginnings of a multi-user game server.  We will
  194. have a Player object for each player.
  195.  
  196.     # Paste the above example in here, up to but not including the
  197.     # mux_input subroutine.
  198.  
  199.     # mux_connection is called when a new connection is accepted.
  200.     sub mux_connection {
  201.         my $package = shift;
  202.         my $mux     = shift;
  203.         my $fh      = shift;
  204.  
  205.         # Construct a new player object
  206.         Player->new($mux, $fh);
  207.     }
  208.  
  209.     package Player;
  210.  
  211.     my %players = ();
  212.  
  213.     sub new {
  214.         my $package = shift;
  215.         my $self    = bless { mux  => shift,
  216.                               fh   => shift } => $package;
  217.  
  218.         # Register the new player object as the callback specifically for
  219.         # this file handle.
  220.  
  221.         $self->{mux}->set_callback_object($self, $self->{fh});
  222.         print $self->{fh}
  223.             "Greetings, Professor.  Would you like to play a game?\n";
  224.  
  225.         # Register this player object in the main list of players
  226.         $players{$self} = $self;
  227.         $mux->set_timeout($self->{fh}, 1);
  228.     }
  229.  
  230.     sub players { return values %players; }
  231.  
  232.     sub mux_input {
  233.         my $self = shift;
  234.         shift; shift;         # These two args are boring
  235.         my $input = shift;    # Scalar reference to the input
  236.  
  237.         # Process each line in the input, leaving partial lines
  238.         # in the input buffer
  239.         while ($$input =~ s/^(.*?)\n//) {
  240.             $self->process_command($1);
  241.         }
  242.     }
  243.  
  244.     sub mux_close {
  245.        my $self = shift;
  246.  
  247.        # Player disconnected;
  248.        # [Notify other players or something...]
  249.        delete $players{$self};
  250.     }
  251.     # This gets called every second to update player info, etc...
  252.     sub mux_timeout {
  253.         my $self = shift;
  254.         my $mux  = shift;
  255.  
  256.         $self->heartbeat;
  257.         $mux->set_timeout($self->{fh}, 1);
  258.     }
  259.  
  260. =head1 METHODS
  261.  
  262. =cut
  263.  
  264. use strict;
  265. use POSIX qw(errno_h BUFSIZ);
  266. use vars qw($VERSION);
  267. use Socket;
  268. use FileHandle qw(autoflush);
  269. use IO::Handle;
  270. use Fcntl;
  271. use Carp qw(carp);
  272. use constant IsWin => ($^O =~ /Win32/i);
  273.  
  274. $VERSION = '1.10';
  275.  
  276. BEGIN {
  277.     eval {
  278.         # Can optionally use Hi Res timers if available
  279.         require Time::HiRes;
  280.         Time::HiRes->import ('time');
  281.     }
  282. };
  283.  
  284. # This is what you want.  Trust me.
  285. $SIG{PIPE} = 'IGNORE';
  286.  
  287. =head2 new
  288.  
  289. Construct a new C<IO::Multiplex> object.
  290.  
  291.     $mux = new IO::Multiplex;
  292.  
  293. =cut
  294.  
  295. sub new
  296. {
  297.     my $package = shift;
  298.     my $self = bless { _readers     => '',
  299.                        _writers     => '',
  300.                        _fhs         => {},
  301.                        _handles     => {},
  302.                        _timerkeys   => {},
  303.                        _timers      => [],
  304.                        _listen      => {}  } => $package;
  305.     return $self;
  306. }
  307.  
  308. =head2 listen
  309.  
  310. Add a socket to be listened on.  The socket should have had the
  311. C<bind> and C<listen> system calls already applied to it.  The C<IO::Socket>
  312. module will do this for you.
  313.  
  314.     $socket = new IO::Socket::INET(Listen => ..., LocalAddr => ...);
  315.     $mux->listen($socket);
  316.  
  317. Connections will be automatically accepted and C<add>ed to the multiplex
  318. object.  C<The mux_connection> callback method will also be called.
  319.  
  320. =cut
  321.  
  322. sub listen
  323. {
  324.     my $self = shift;
  325.     my $fh   = shift;
  326.  
  327.     $self->add($fh);
  328.     $self->{_fhs}{"$fh"}{listen} = 1;
  329. }
  330.  
  331. =head2 add
  332.  
  333. Add a file handle to the multiplexer.
  334.  
  335.     $mux->add($fh);
  336.  
  337. As a side effect, this sets non-blocking mode on the handle, and disables
  338. STDIO buffering.  It also ties it to intercept output to the handle.
  339.  
  340. =cut
  341.  
  342. sub add
  343. {
  344.     my $self = shift;
  345.     my $fh   = shift;
  346.  
  347.     return if $self->{_fhs}{"$fh"};
  348.  
  349.     nonblock($fh);
  350.     autoflush($fh, 1);
  351.     fd_set($self->{_readers}, $fh, 1);
  352.     $self->{_fhs}{"$fh"}{udp_true} =
  353.         (SOCK_DGRAM == unpack("i", scalar getsockopt($fh,Socket::SOL_SOCKET(),Socket::SO_TYPE())));
  354.     $self->{_fhs}{"$fh"}{inbuffer} = '';
  355.     $self->{_fhs}{"$fh"}{outbuffer} = '';
  356.     $self->{_fhs}{"$fh"}{fileno} = fileno($fh);
  357.     $self->{_handles}{"$fh"} = $fh;
  358.     tie *$fh, "IO::Multiplex::Handle", $self, $fh;
  359.     return $fh;
  360. }
  361.  
  362. =head2 remove
  363.  
  364. Removes a file handle from the multiplexer.  This also unties the
  365. handle.  It does not currently turn STDIO buffering back on, or turn
  366. off non-blocking mode.
  367.  
  368.     $mux->remove($fh);
  369.  
  370. =cut
  371.  
  372. sub remove
  373. {
  374.     my $self = shift;
  375.     my $fh   = shift;
  376.     fd_set($self->{_writers}, $fh, 0);
  377.     fd_set($self->{_readers}, $fh, 0);
  378.     delete $self->{_fhs}{"$fh"};
  379.     delete $self->{_handles}{"$fh"};
  380.     $self->_removeTimer($fh);
  381.     untie *$fh;
  382. }
  383.  
  384. =head2 set_callback_object
  385.  
  386. Set the object on which callbacks are made.  If you are not using objects,
  387. you can specify the name of the package into which the method calls are
  388. to be made.
  389.  
  390. If a file handle is supplied, the callback object is specific for that
  391. handle:
  392.  
  393.     $mux->set_callback_object($object, $fh);
  394.  
  395. Otherwise, it is considered a default callback object, and is used when
  396. events occur on a file handle that does not have its own callback object.
  397.  
  398.     $mux->set_callback_object(__PACKAGE__);
  399.  
  400. The previously registered object (if any) is returned.
  401.  
  402. See also the CALLBACK INTERFACE section.
  403.  
  404. =cut
  405.  
  406. sub set_callback_object
  407. {
  408.     my $self = shift;
  409.     my $obj  = shift;
  410.     my $fh   = shift;
  411.     return if $fh && !exists($self->{_fhs}{"$fh"});
  412.  
  413.     my $old  = $fh ? $self->{_fhs}{"$fh"}{object} : $self->{_object};
  414.  
  415.     $fh ? $self->{_fhs}{"$fh"}{object} : $self->{_object} = $obj;
  416.     return $old;
  417. }
  418.  
  419. =head2 kill_output
  420.  
  421. Remove any pending output on a file descriptor.
  422.  
  423.     $mux->kill_output($fh);
  424.  
  425. =cut
  426.  
  427. sub kill_output
  428. {
  429.     my $self = shift;
  430.     my $fh   = shift;
  431.     return unless $fh && exists($self->{_fhs}{"$fh"});
  432.  
  433.     $self->{_fhs}{"$fh"}{outbuffer} = '';
  434.     fd_set($self->{_writers}, $fh, 0);
  435. }
  436.  
  437. =head2 outbuffer
  438.  
  439. Return or set the output buffer for a descriptor
  440.  
  441.     $output = $mux->outbuffer($fh);
  442.     $mux->outbuffer($fh, $output);
  443.  
  444. =cut
  445.  
  446. sub outbuffer
  447. {
  448.     my $self = shift;
  449.     my $fh   = shift;
  450.     return unless $fh && exists($self->{_fhs}{"$fh"});
  451.  
  452.     if (@_) {
  453.         $self->{_fhs}{"$fh"}{outbuffer} = $_[0] if @_;
  454.         fd_set($self->{_writers}, $fh, 0) if !$_[0];
  455.     }
  456.  
  457.     return $self->{_fhs}{"$fh"}{outbuffer};
  458. }
  459.  
  460. =head2 inbuffer
  461.  
  462. Return or set the input buffer for a descriptor
  463.  
  464.     $input = $mux->inbuffer($fh);
  465.     $mux->inbuffer($fh, $input);
  466.  
  467. =cut
  468.  
  469. sub inbuffer
  470. {
  471.     my $self = shift;
  472.     my $fh   = shift;
  473.     return unless $fh && exists($self->{_fhs}{"$fh"});
  474.  
  475.     if (@_) {
  476.         $self->{_fhs}{"$fh"}{inbuffer} = $_[0] if @_;
  477.     }
  478.  
  479.     return $self->{_fhs}{"$fh"}{inbuffer};
  480. }
  481.  
  482. =head2 set_timeout
  483.  
  484. Set the timer for a file handle.  The timeout value is a certain number of
  485. seconds in the future, after which the C<mux_timeout> callback is called.
  486.  
  487. If the C<Time::HiRes> module is installed, the timers may be specified in
  488. fractions of a second.
  489.  
  490. Timers are not reset automatically.
  491.  
  492.     $mux->set_timeout($fh, 23.6);
  493.  
  494. Use C<$mux-E<gt>set_timeout($fh, undef)> to cancel a timer.
  495.  
  496. =cut
  497.  
  498. sub set_timeout
  499. {
  500.     my $self     = shift;
  501.     my $fh       = shift;
  502.     my $timeout  = shift;
  503.     return unless $fh && exists($self->{_fhs}{"$fh"});
  504.  
  505.     if (defined $timeout) {
  506.         $self->_addTimer($fh, $timeout + time);
  507.     } else {
  508.         $self->_removeTimer($fh);
  509.     }
  510. }
  511.  
  512. =head2 handles
  513.  
  514. Returns a list of handles that the C<IO::Multiplex> object knows about,
  515. excluding listen sockets.
  516.  
  517.     @handles = $mux->handles;
  518.  
  519. =cut
  520.  
  521. sub handles
  522. {
  523.     my $self = shift;
  524.  
  525.     return grep(!$self->{_fhs}{"$_"}{listen}, values %{$self->{_handles}});
  526. }
  527.  
  528. sub _addTimer {
  529.     my $self = shift;
  530.     my $fh   = shift;
  531.     my $time = shift;
  532.  
  533.     # Set a key so that we can quickly tell if a given $fh has
  534.     # a timer set
  535.     $self->{_timerkeys}{"$fh"} = 1;
  536.  
  537.     # Store the timeout in an array, and resort it
  538.     @{$self->{_timers}} = sort { $a->[1] <=> $b->[1] } (@{$self->{_timers}}, [ $fh, $time ] );
  539. }
  540.  
  541. sub _removeTimer {
  542.     my $self = shift;
  543.     my $fh   = shift;
  544.  
  545.     # Return quickly if no timer is set
  546.     return unless exists $self->{_timerkeys}{"$fh"};
  547.  
  548.     # Remove the timeout from the sorted array
  549.     @{$self->{_timers}} = grep { $_->[0] ne $fh } @{$self->{_timers}};
  550.  
  551.     # Get rid of the key
  552.     delete $self->{_timerkeys}{"$fh"};
  553. }
  554.  
  555.  
  556. =head2 loop
  557.  
  558. Enter the main loop and start processing IO events.
  559.  
  560.     $mux->loop;
  561.  
  562. =cut
  563.  
  564. sub loop
  565. {
  566.     my $self = shift;
  567.     my $heartbeat = shift;
  568.     $self->{_endloop} = 0;
  569.  
  570.     while (!$self->{_endloop} && keys %{$self->{_fhs}}) {
  571.         my $rv;
  572.         my $data;
  573.         my $rdready = "";
  574.         my $wrready = "";
  575.         my $timeout = undef;
  576.  
  577.         foreach my $fh (values %{$self->{_handles}}) {
  578.             fd_set($rdready, $fh, 1) if
  579.                 ref($fh) =~ /SSL/ &&
  580.                 $fh->can("pending") &&
  581.                 $fh->pending;
  582.         }
  583.  
  584.         if (!length $rdready) {
  585.             if (@{$self->{_timers}}) {
  586.                 $timeout = $self->{_timers}[0][1] - time;
  587.             }
  588.  
  589.             my $numready = select($rdready=$self->{_readers},
  590.                                   $wrready=$self->{_writers},
  591.                                   undef,
  592.                                   $timeout);
  593.  
  594.             unless(defined($numready)) {
  595.                 if ($! == EINTR || $! == EAGAIN) {
  596.                     next;
  597.                 } else {
  598.                     last;
  599.                 }
  600.             }
  601.         }
  602.  
  603.         &{ $heartbeat } ($rdready, $wrready) if $heartbeat;
  604.  
  605.         foreach my $k (keys %{$self->{_handles}}) {
  606.             my $fh = $self->{_handles}->{$k} or next;
  607.             # Avoid creating a permanent empty hash ref for "$fh"
  608.             # by attempting to access its {object} element
  609.             # if it has already been closed.
  610.             next unless exists $self->{_fhs}{"$fh"};
  611.  
  612.             # Get the callback object.
  613.             my $obj = $self->{_fhs}{"$fh"}{object} ||
  614.                 $self->{_object};
  615.  
  616.             # Is this descriptor ready for reading?
  617.             if (fd_isset($rdready, $fh))
  618.             {
  619.                 if ($self->{_fhs}{"$fh"}{listen}) {
  620.                     # It's a server socket, so a new connection is
  621.                     # waiting to be accepted
  622.                     my $client = $fh->accept;
  623.                     next unless ($client);
  624.                     $self->add($client);
  625.                     $obj->mux_connection($self, $client)
  626.                         if $obj && $obj->can("mux_connection");
  627.                 } else {
  628.                     if ($self->is_udp($fh)) {
  629.                         $rv = recv($fh, $data, BUFSIZ, 0);
  630.                         if (defined $rv) {
  631.                             # Remember where the last UDP packet came from
  632.                             $self->{_fhs}{"$fh"}{udp_peer} = $rv;
  633.                         }
  634.                     } else {
  635.                         $rv = &POSIX::read(fileno($fh), $data, BUFSIZ);
  636.                     }
  637.  
  638.                     if (defined($rv) && length($data)) {
  639.                         # Append the data to the client's receive buffer,
  640.                         # and call process_input to see if anything needs to
  641.                         # be done.
  642.                         $self->{_fhs}{"$fh"}{inbuffer} .= $data;
  643.                         $obj->mux_input($self, $fh,
  644.                                         \$self->{_fhs}{"$fh"}{inbuffer})
  645.                             if $obj && $obj->can("mux_input");
  646.                     } else {
  647.                         unless (defined $rv) {
  648.                             next if
  649.                                 $! == EINTR ||
  650.                                 $! == EAGAIN ||
  651.                                 $! == EWOULDBLOCK;
  652.                 warn "IO::Multiplex read error: $!"
  653.                                 if $! != ECONNRESET;
  654.                         }
  655.                         # There's an error, or we received EOF.  If
  656.                         # there's pending data to be written, we leave
  657.                         # the connection open so it can be sent.  If
  658.                         # the other end is closed for writing, the
  659.                         # send will error and we close down there.
  660.                         # Either way, we remove it from _readers as
  661.                         # we're no longer interested in reading from
  662.                         # it.
  663.                         fd_set($self->{_readers}, $fh, 0);
  664.                         $obj->mux_eof($self, $fh,
  665.                                       \$self->{_fhs}{"$fh"}{inbuffer})
  666.                             if $obj && $obj->can("mux_eof");
  667.  
  668.                         if (exists $self->{_fhs}{"$fh"}) {
  669.                             delete $self->{_fhs}{"$fh"}{inbuffer};
  670.                             # The mux_eof handler could have responded
  671.                             # with a shutdown for writing.
  672.                             $self->close($fh)
  673.                                 unless exists $self->{_fhs}{"$fh"} &&
  674.                                     exists $self->{_fhs}{"$fh"}{outbuffer};
  675.                         }
  676.                         next;
  677.                     }
  678.                 }
  679.             }  # end if readable
  680.             next unless exists $self->{_fhs}{"$fh"};
  681.  
  682.             if (fd_isset($wrready, $fh)) {
  683.                 unless ($self->{_fhs}{"$fh"}{outbuffer}) {
  684.                     fd_set($self->{_writers}, $fh, 0);
  685.                     $obj->mux_outbuffer_empty($self, $fh)
  686.                         if ($obj && $obj->can("mux_outbuffer_empty"));
  687.                     next;
  688.                 }
  689.                 $rv = &POSIX::write(fileno($fh),
  690.                                     $self->{_fhs}{"$fh"}{outbuffer},
  691.                                     length($self->{_fhs}{"$fh"}{outbuffer}));
  692.                 unless (defined($rv)) {
  693.                     # We got an error writing to it.  If it's
  694.                     # EWOULDBLOCK (shouldn't happen if select told us
  695.                     # we can write) or EAGAIN, or EINTR we don't worry
  696.                     # about it.  otherwise, close it down.
  697.                     unless ($! == EWOULDBLOCK ||
  698.                             $! == EINTR ||
  699.                             $! == EAGAIN) {
  700.                         if ($! == EPIPE) {
  701.                             $obj->mux_epipe($self, $fh)
  702.                                 if $obj && $obj->can("mux_epipe");
  703.                         } else {
  704.                             warn "IO::Multiplex: write error: $!\n";
  705.                         }
  706.                         $self->close($fh);
  707.                     }
  708.                     next;
  709.                 }
  710.                 substr($self->{_fhs}{"$fh"}{outbuffer}, 0, $rv) = '';
  711.                 unless ($self->{_fhs}{"$fh"}{outbuffer}) {
  712.                     # Mark us as not writable if there's nothing more to
  713.                     # write
  714.                     fd_set($self->{_writers}, $fh, 0);
  715.                     $obj->mux_outbuffer_empty($self, $fh)
  716.                         if ($obj && $obj->can("mux_outbuffer_empty"));
  717.  
  718.                     if ($self->{_fhs}{"$fh"}{shutdown}) {
  719.                         # If we've been marked for shutdown after write
  720.                         # do it.
  721.                         shutdown($fh, 1);
  722.                         delete $self->{_fhs}{"$fh"}{outbuffer};
  723.                         unless (exists $self->{_fhs}{"$fh"}{inbuffer}) {
  724.                             # We'd previously been shutdown for reading
  725.                             # also, so close out completely
  726.                             $self->close($fh);
  727.                             next;
  728.                         }
  729.                     }
  730.                 }
  731.             }  # End if writeable
  732.  
  733.             next unless exists $self->{_fhs}{"$fh"};
  734.  
  735.         }  # End foreach $fh (...)
  736.  
  737.         $self->_checkTimeouts() if @{$self->{_timers}};
  738.  
  739.     } # End while(loop)
  740. }
  741.  
  742. sub _checkTimeouts {
  743.     my $self = shift;
  744.  
  745.     # Get the current time
  746.     my $time = time;
  747.  
  748.     # Copy all of the timers that should go off into
  749.     # a temporary array. This allows us to modify the
  750.     # real array as we process the timers, without
  751.     # interfering with the loop.
  752.  
  753.     my @timers = ();
  754.     foreach my $timer (@{$self->{_timers}}) {
  755.         # If the timer is in the future, we can stop
  756.         last if $timer->[1] > $time;
  757.         push @timers, $timer;
  758.     }
  759.  
  760.     foreach my $timer (@timers) {
  761.         my $fh = $timer->[0];
  762.         $self->_removeTimer($fh);
  763.  
  764.         next unless exists $self->{_fhs}{"$fh"};
  765.  
  766.         my $obj = $self->{_fhs}{"$fh"}{object} || $self->{_object};
  767.         $obj->mux_timeout($self, $fh) if $obj && $obj->can("mux_timeout");
  768.     }
  769. }
  770.  
  771.  
  772. =head2 endloop
  773.  
  774. Prematurly terminate the loop.  The loop will automatically terminate
  775. when there are no remaining descriptors to be watched.
  776.  
  777.     $mux->endloop;
  778.  
  779. =cut
  780.  
  781. sub endloop
  782. {
  783.     my $self = shift;
  784.     $self->{_endloop} = 1;
  785. }
  786.  
  787. =head2 udp_peer
  788.  
  789. Get peer endpoint of where the last udp packet originated.
  790.  
  791.     $saddr = $mux->udp_peer($fh);
  792.  
  793. =cut
  794.  
  795. sub udp_peer {
  796.   my $self = shift;
  797.   my $fh = shift;
  798.   return $self->{_fhs}{"$fh"}{udp_peer};
  799. }
  800.  
  801. =head2 is_udp
  802.  
  803. Sometimes UDP packets require special attention.
  804. This method will tell if a file handle is of type UDP.
  805.  
  806.     $is_udp = $mux->is_udp($fh);
  807.  
  808. =cut
  809.  
  810. sub is_udp {
  811.   my $self = shift;
  812.   my $fh = shift;
  813.   return $self->{_fhs}{"$fh"}{udp_true};
  814. }
  815.  
  816. =head2 write
  817.  
  818. Send output to a file handle.
  819.  
  820.     $mux->write($fh, "'ere I am, JH!\n");
  821.  
  822. =cut
  823.  
  824. sub write
  825. {
  826.     my $self = shift;
  827.     my $fh   = shift;
  828.     my $data = shift;
  829.     return unless $fh && exists($self->{_fhs}{"$fh"});
  830.  
  831.     if ($self->{_fhs}{"$fh"}{shutdown}) {
  832.         $! = EPIPE;
  833.         return undef;
  834.     }
  835.     if ($self->is_udp($fh)) {
  836.         if (my $udp_peer = $self->udp_peer($fh)) {
  837.             # Send the packet back to the last peer that said something
  838.             return send($fh, $data, 0, $udp_peer);
  839.         } else {
  840.             # No udp_peer yet?
  841.             # This better be a connect()ed UDP socket
  842.             # or else this will fail with ENOTCONN
  843.             return send($fh, $data, 0);
  844.         }
  845.     }
  846.     $self->{_fhs}{"$fh"}{outbuffer} .= $data;
  847.     fd_set($self->{_writers}, $fh, 1);
  848.     return length($data);
  849. }
  850.  
  851. =head2 shutdown
  852.  
  853. Shut down a socket for reading or writing or both.  See the C<shutdown>
  854. Perl documentation for further details.
  855.  
  856. If the shutdown is for reading, it happens immediately.  However,
  857. shutdowns for writing are delayed until any pending output has been
  858. successfully written to the socket.
  859.  
  860.     $mux->shutdown($socket, 1);
  861.  
  862. =cut
  863.  
  864. sub shutdown
  865. {
  866.     my $self = shift;
  867.     my $fh = shift;
  868.     my $which = shift;
  869.     return unless $fh && exists($self->{_fhs}{"$fh"});
  870.  
  871.     if ($which == 0 || $which == 2) {
  872.         # Shutdown for reading.  We can do this now.
  873.         shutdown($fh, 0);
  874.         # The mux_eof hook must be run from the main loop to consume
  875.         # the rest of the inbuffer if there is anything left.
  876.         # It will also remove $fh from _readers.
  877.     }
  878.  
  879.     if ($which == 1 || $which == 2) {
  880.         # Shutdown for writing.  Only do this now if there is no pending
  881.         # data.
  882.         if ($self->{_fhs}{"$fh"}{outbuffer}) {
  883.             $self->{_fhs}{"$fh"}{shutdown} = 1;
  884.         } else {
  885.             shutdown($fh, 1);
  886.             delete $self->{_fhs}{"$fh"}{outbuffer};
  887.         }
  888.     }
  889.     # Delete the descriptor if it's totally gone.
  890.     unless (exists $self->{_fhs}{"$fh"}{inbuffer} ||
  891.             exists $self->{_fhs}{"$fh"}{outbuffer}) {
  892.         $self->close($fh);
  893.     }
  894. }
  895.  
  896. =head2 close
  897.  
  898. Close a handle.  Always use this method to close a handle that is being
  899. watched by the multiplexer.
  900.  
  901.     $mux->close($fh);
  902.  
  903. =cut
  904.  
  905. sub close
  906. {
  907.     my $self = shift;
  908.     my $fh = shift;
  909.     return unless exists $self->{_fhs}{"$fh"};
  910.  
  911.     my $obj = $self->{_fhs}{"$fh"}{object} || $self->{_object};
  912.     warn "closeing with read buffer" if $self->{_fhs}{"$fh"}{inbuffer};
  913.     warn "closeing with write buffer" if $self->{_fhs}{"$fh"}{outbuffer};
  914.  
  915.     fd_set($self->{_readers}, $fh, 0);
  916.     fd_set($self->{_writers}, $fh, 0);
  917.  
  918.     delete $self->{_fhs}{"$fh"};
  919.     delete $self->{_handles}{"$fh"};
  920.     untie *$fh;
  921.     close $fh;
  922.     $obj->mux_close($self, $fh) if $obj && $obj->can("mux_close");
  923. }
  924.  
  925. # We set non-blocking mode on all descriptors.  If we don't, then send
  926. # might block if the data is larger than the kernel can accept all at once,
  927. # even though select told us we can write.  With non-blocking mode, we
  928. # get a partial write in those circumstances, which is what we want.
  929.  
  930. sub nonblock
  931. {
  932.     return 1 if IsWin;
  933.     my $fh = shift;
  934.     my $flags = fcntl($fh, F_GETFL, 0)
  935.         or die "fcntl F_GETFL: $!\n";
  936.     fcntl($fh, F_SETFL, $flags | O_NONBLOCK)
  937.         or die "fcntl F_SETFL $!\n";
  938. }
  939.  
  940. sub fd_set
  941. {
  942.      vec($_[0], fileno($_[1]), 1) = $_[2];
  943. }
  944.  
  945. sub fd_isset
  946. {
  947.     return vec($_[0], fileno($_[1]), 1);
  948. }
  949.  
  950. # We tie handles into this package to handle write buffering.
  951.  
  952. package IO::Multiplex::Handle;
  953.  
  954. use strict;
  955. use Tie::Handle;
  956. use Carp;
  957. use vars qw(@ISA);
  958. @ISA = qw(Tie::Handle);
  959.  
  960. sub FILENO
  961. {
  962.     my $self = shift;
  963.     return ($self->{_mux}->{_fhs}->{"$self->{_fh}"}->{fileno});
  964. }
  965.  
  966.  
  967. sub TIEHANDLE
  968. {
  969.     my $package = shift;
  970.     my $mux = shift;
  971.     my $fh  = shift;
  972.  
  973.     my $self = bless { _mux   => $mux,
  974.                        _fh    => $fh } => $package;
  975.     return $self;
  976. }
  977.  
  978. sub WRITE
  979. {
  980.     my $self = shift;
  981.     my ($msg, $len, $offset) = @_;
  982.     $offset ||= 0;
  983.     return $self->{_mux}->write($self->{_fh}, substr($msg, $offset, $len));
  984. }
  985.  
  986. sub CLOSE
  987. {
  988.     my $self = shift;
  989.     return $self->{_mux}->shutdown($self->{_fh}, 2);
  990. }
  991.  
  992. sub READ
  993. {
  994.     carp "Do not read from a muxed file handle";
  995. }
  996.  
  997. sub READLINE
  998. {
  999.     carp "Do not read from a muxed file handle";
  1000. }
  1001.  
  1002. sub FETCH
  1003. {
  1004.     return "Fnord";
  1005. }
  1006.  
  1007. 1;
  1008.  
  1009. __END__
  1010.  
  1011. =head1 CALLBACK INTERFACE
  1012.  
  1013. Callback objects should support the following interface.  You do not have
  1014. to provide all of these methods, just provide the ones you are interested in.
  1015.  
  1016. All methods receive a reference to the callback object (or package) as
  1017. their first argument, in the traditional object oriented
  1018. way. References to the C<IO::Multiplex> object and the relevant file
  1019. handle are also provided.  This will be assumed in the method
  1020. descriptions.
  1021.  
  1022. =head2 mux_input
  1023.  
  1024. Called when input is ready on a descriptor.  It is passed a reference to
  1025. the input buffer.  It should remove any input that it has consumed, and
  1026. leave any partially received data in the buffer.
  1027.  
  1028.     sub mux_input {
  1029.         my $self = shift;
  1030.         my $mux  = shift;
  1031.         my $fh   = shift;
  1032.         my $data = shift;
  1033.  
  1034.         # Process each line in the input, leaving partial lines
  1035.         # in the input buffer
  1036.         while ($$data =~ s/^(.*?\n)//) {
  1037.             $self->process_command($1);
  1038.         }
  1039.     }
  1040.  
  1041. =head2 mux_eof
  1042.  
  1043. This is called when an end-of-file condition is present on the descriptor.
  1044. This is does not nessecarily mean that the descriptor has been closed, as
  1045. the other end of a socket could have used C<shutdown> to close just half
  1046. of the socket, leaving us free to write data back down the still open
  1047. half.  Like mux_input, it is also passed a reference to the input buffer.
  1048. It should consume the entire buffer or else it will just be lost.
  1049.  
  1050. In this example, we send a final reply to the other end of the socket,
  1051. and then shut it down for writing.  Since it is also shut down for reading
  1052. (implicly by the EOF condition), it will be closed once the output has
  1053. been sent, after which the mux_close callback will be called.
  1054.  
  1055.     sub mux_eof {
  1056.         my $self = shift;
  1057.         my $mux  = shift;
  1058.         my $fh   = shift;
  1059.  
  1060.         print $fh "Well, goodbye then!\n";
  1061.         $mux->shutdown($fh, 1);
  1062.     }
  1063.  
  1064. =head2 mux_close
  1065.  
  1066. Called when a handle has been completely closed.  At the time that
  1067. C<mux_close> is called, the handle will have been removed from the
  1068. multiplexer, and untied.
  1069.  
  1070. =head2 mux_outbuffer_empty
  1071.  
  1072. Called after all pending output has been written to the file descriptor.
  1073.  
  1074. =head2 mux_connection
  1075.  
  1076. Called upon a new connection being accepted on a listen socket.
  1077.  
  1078. =head2 mux_timeout
  1079.  
  1080. Called when a timer expires.
  1081.  
  1082. =head1 AUTHOR
  1083.  
  1084. Copyright 1999 Bruce J Keeler <bruce@gridpoint.com>
  1085.  
  1086. Copyright 2001-2008 Rob Brown <bbb@cpan.org>
  1087.  
  1088. Released under the same terms as Perl itself.
  1089.  
  1090. $Id: Multiplex.pm,v 1.36 2008/09/15 08:17:50 rob Exp $
  1091.  
  1092. =cut
  1093.